package com.gvsoft.communication.net;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;

import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.gvsoft.communication.common.IConfig;
import com.gvsoft.communication.common.Tools;
import com.gvsoft.communication.net.packethandle.IPacketHandle;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * Created with IntelliJ IDEA.
 * ProjectName:MService
 * Author: zhaoqiubo
 * Date: 15/11/20
 * Time: 上午10:05
 * Desc: 任务分发器
 */
public class Dispatcher implements Closeable {

    private static Logger logger = LogManager.getLogger("net");
    /**
     * 存储所有报文处理类的实例，一次性初始化进入Map（为连接通道准备的单例类Map）
     */
    private Map<String, IPacketHandle> phMap = new ConcurrentHashMap<>();
    /**
     * 网络监听器数组,用于NIO的服务端
     */
    private ChannelListener[] listeners;
    /**
     * 服务配置类
     */
    private IConfig cfg;

    /**
     * 针对接收到的报文进行处理的线程池,报文处理执行器
     */
    private ExecutorService readHandleExecutor;

    /**
     * 服务端使用,网络监听器索引
     */
    private AtomicInteger listenerIndex = new AtomicInteger(0);
    /**
     * 客户端使用,连接成功阻塞栓
     */
    private CountDownLatch connectLatch = new CountDownLatch(1);
    /**
     * 客户端使用,客户端网络监听器
     */
    private ChannelListener listener;

    SocketChannel channel;

    ServerSocketChannel serverSocketChannel;

    public Dispatcher(IConfig cfg) {
        this.cfg = cfg;
    }

    /**
     * 启动NIO消息服务
     *
     * @throws IOException 启动过程失败,则返回IOException
     */
    public void serverStart() throws IOException {
        //打开一个serversocket通道，ServerSocketChannel是一个监控是否有新连接进入的通道。
        serverSocketChannel = ServerSocketChannel.open();
        //将这个serversokect通道设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //绑定serversokect的ip和端口
        serverSocketChannel.socket().bind(new InetSocketAddress(cfg.getIp(), cfg.getPort()));
        //启动多路复用器
        listeners = new ChannelListener[cfg.getListenerCount()];
        for (int i = 0; i < cfg.getListenerCount(); i++) {
            listeners[i] = new ChannelListener(this, "NO." + (i + 1) + " LISTENER", cfg);
            if (i == 0) {
                serverSocketChannel.register(listeners[i].getSelector(), SelectionKey.OP_ACCEPT);
            }
            listeners[i].start();
            logger.info(listeners[i].getName() + " Starting…………");
        }
        //根据配置启动报文处理线程池
        readHandleExecutor = new ThreadPoolExecutor(cfg.getReadHandleThreadCount()
                , cfg.getReadHandleThreadCount(), 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(cfg.getReadHandleThreadCount() * 30));
        logger.info("通讯服务已经启动IP：" + cfg.getIp() + "，端口："
                + cfg.getPort() + "，【1】号监听器开始接收客户端连接…………");
    }

    /**
     * 读取packet处理类配置
     *
     * @param pHandleCfgStr packet处理类配置字符串
     */
    public void loadPacketHandleCls(String pHandleCfgStr) {
        if (StringUtils.isEmpty(pHandleCfgStr)) {
            logger.error("Failed to load the PacketHandle model! ");
        } else {
            String[] handleOuterArray =
                    Tools.str2ArrayByChar(pHandleCfgStr, "|");
            try {
                for (String handleOuterStr : handleOuterArray) {
                    String[] handleInnerArray = Tools.str2ArrayByChar(handleOuterStr, ":");
                    Class c = Class.forName(handleInnerArray[1]);
                    Object handleClass = c.newInstance();
                    this.phMap.put(handleInnerArray[0], (IPacketHandle) handleClass);
                }
            } catch (Exception e) {
                logger.error("Failed to load the PacketHandle model! reason:" + e.getMessage());
                e.printStackTrace();
            }
        }
    }

    /**
     * 客户端启动
     * @param nSocket NSocket对象
     */
    public void clientStart(NSocket nSocket ) {
        InetSocketAddress serverAddress = new InetSocketAddress(cfg.getIp(), cfg.getPort());
        try {
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            listener = new ChannelListener(this, nSocket.getUserId() + " LISTENER", cfg);
            SelectionKey key = channel.register(listener.getSelector(), SelectionKey.OP_CONNECT, nSocket);
            nSocket.reCreate(channel,this,cfg.getIoAdapter(),key,cfg.getSocketStatusHandle());
            nSocket.setNew();
            listener.start();
            channel.connect(serverAddress);
            //根据配置启动报文处理线程池
            readHandleExecutor = new ThreadPoolExecutor(cfg.getReadHandleThreadCount()
                    , cfg.getReadHandleThreadCount(), 60, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(cfg.getReadHandleThreadCount() * 30));
            if (this.connectLatch.await(10, TimeUnit.SECONDS)) {
                logger.info(nSocket.getUserId() + "连接通道为:" + nSocket.getChannel());
            } else {
                logger.info(nSocket.getUserId() + "连接超时,开始重连……");
                nSocket.setReconnect();
            }
        } catch (Exception e) {
            logger.error("client start error:"+e.getMessage());
            nSocket.close();
        }


    }


    public void connectLatchDown() {
        connectLatch.countDown();
    }

    /**
     * 从报文处理实例池中取出操作实例
     *
     * @param classKey 报文处理类的标记key
     * @return 返回当前key字所对应的报文处理类对象
     */
    public IPacketHandle getPacketHandleInstance(String classKey) {
        return phMap.get(classKey);
    }

    /**
     * 注册一个新类型的packet处理类
     *
     * @param key    类型标记
     * @param handle 处理类实例
     */
    public void regPacketHandleCls(String key, IPacketHandle handle) {
        phMap.put(key, handle);
    }

    /**
     * 获取新的selector分配给新连接
     */
    public ChannelListener getNextListener() {
        if (cfg.getListenerCount() <= 1) {
            return listeners[0];
        } else {
            return listeners[this.listenerIndex.getAndIncrement() % cfg.getListenerCount()];
        }
    }

    public void close() {
        if (readHandleExecutor != null) {
            readHandleExecutor.shutdownNow();
        }
        if (listeners != null) {
            for (ChannelListener listener : listeners) {
                listener.interrupt();
            }
        }
        if (listener != null) {
            listener.interrupt();
        }
        if (channel != null) {
            try {
                channel.close();
            } catch (IOException e) {
                logger.error("ServerSocketChannel close error:" + e.getMessage());
            }
        }
        if (serverSocketChannel != null) {
            logger.info("GVServer消息路由服务关闭！");
            try {
                serverSocketChannel.close();
            } catch (IOException e) {
                logger.error("ServerSocketChannel close error:" + e.getMessage());
            }
        }
    }


    /**
     * 启动读取处理线程
     */
    public void execReadHandle(Runnable runnable) {
        readHandleExecutor.submit(runnable);
    }
}
